package com.offcn.bigdata.spark.p2

import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * rdd的行动算子
  *    count
  *    foreach
  *
  *    countByValue
  *    countByKey
  *    take
  *    first
  *    collect
  *    reduce
  *    saveAsxxx
  *    foreachPartition
  */
object _04ActionOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
                        .setAppName("ActionOps")
                        .setMaster("local[*]")
        val sc = new SparkContext(conf)

        val lines = sc.parallelize(List(
            "hello you",
            "hello me",
            "hello lan lan"
        ))

        val pairs = lines.flatMap(_.split("\\s+")).map((_, 1))
        var ret:RDD[(String, Int)] = pairs.reduceByKey(_+_)

        //countByKey 统计每一个key出现的次数
        val kvs = pairs.countByKey()
        println(kvs)
        println("------------countByValue------------")//也是按照value来统计出现了几次k-v键值对
        val cbv = pairs.countByValue()
        cbv.foreach{case ((key, value), longVal) => {
            println(s"key: ${key}, value: ${value}, longVal: ${longVal}")
        }}
        println("------------take------------")
        /*
            如果某一个分区能够满足num数据，直接从这个分区获取，反之从多个分区中共同返回num条记录
         */
        var listRDD = sc.parallelize(1 to 10, 2)
        listRDD = listRDD.mapPartitionsWithIndex((index, partition) => {
            val list = partition.toList
            println(s"rdd中partition<${index}>中的数据为：" + list.mkString("[", ", ", "]"))
            list.toIterator
        })
        val take3:Array[Int] = listRDD.take(6)
        take3.foreach(println)
        println("------------first------------")
        val firstValue = listRDD.first()
        println("firstvalue:" + firstValue)
        println("------------collect------------")
        /*
            collect: 意为收集的含义，指的是把分布在各个executor对应的partition中的数据
            都拉取到driver端，返回值是一个集合
            需要注意：如果把所有的partition中的数据都拉回到driver，是很容易造成driver的oom
            所以以后再工作中不能随随便便的执行collect，可以试用贴take，或者在collect之前先filter
         */
        val array = listRDD.collect()
        array.foreach(println)
        println("------------reduce------------")
        /*
            reduceByKey是一个transformation，
            reduce是一个action操作
         */
        val sum = listRDD.reduce(_+_)
        println("sum: " + sum)
        println("------------saveAsXxx----------------")
        //text就是saveAsHadoopFile的简写
//        ret.saveAsTextFile("file:/E:/data/out/spark/wc-text")
        //object ---> saveAsSequenceFile的简写
//        ret.saveAsObjectFile("file:/E:/data/out/spark/wc-obj")
        /**
          * saveAsNewAPIHadoopFile
          * saveAsHadoopFile的区别
          *     只有一个地方不同：OutputFormat不同
          *
          *     saveAsHadoopFile是比较早的版本
          *         对应的OutputFormat是：org.apache.hadoop.mapred.OutputFormat 是一个接口
          *     saveAsNewAPIHadoopFile是新的版本
          *         对应的OutputFormat是：org.apache.hadoop.mapreduce.OutputFormat是一个抽象类
          * 以 saveAsNewAPIHadoopFile为例说明
          *   path: String, 输出路径
              keyClass: Class[_], rdd中的key的类型
              valueClass: Class[_], 要输出的rdd的value的类型
              outputFormatClass: Class[_ <: NewOutputFormat[_, _]], 输出的outputFormat类型
              conf: Configuration = self.context.hadoopConfiguration     有默认值
          */
        ret.saveAsNewAPIHadoopFile(
            path = "file:/E:/data/out/spark/wc-hadoop",
            keyClass = classOf[Text],
            valueClass = classOf[IntWritable],
            outputFormatClass = classOf[TextOutputFormat[Text, IntWritable]]
        )
        sc.stop()
    }
}
